package day02.transform;

import beans.SensorReading;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import scala.Tuple2;

/**
 * Flink 流处理 API - 富函数（Rich Functions）
 *
 * @author lvbingbing
 * @date 2021-12-14 21:03
 */
public class FlinkTransform06 {
    public static void main(String[] args) throws Exception {
        // 1、创建FlinkTransform00对象，有参构造会初始化 env，并从文件中读取数据
        int parallelism = 4;
        FlinkTransform00 flinkTransform = new FlinkTransform00(parallelism);
        // 2、获取执行环境
        StreamExecutionEnvironment env = flinkTransform.getEnv();
        // 3、学习富函数
        studyRichFunctions(flinkTransform.getSensorReadingStream());
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 学习富函数（Rich Functions）<br>
     * “富函数” 是 DataStream API 提供的一个函数类的接口，所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于，
     * 可以获取运行环境的上下文，并拥有一些生命周期方法，所以可以实现更复杂的功能。
     * <p>
     * ⚫ RichMapFunction<br>
     * ⚫ RichFlatMapFunction<br>
     * ⚫ RichFilterFunction<br>
     * ⚫ …<br>
     * Rich Function 有一个生命周期的概念。典型的生命周期方法有：<br>
     * ⚫ open()方法是 rich function 的初始化方法，当一个算子例如 map 或者 filter 被调用之前 open()会被调用。<br>
     * ⚫ close()方法是生命周期中的最后一个调用的方法，做一些清理工作。<br>
     * ⚫ getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息，例如函数执行的并行度，任务的名字，以及 state 状态
     *
     * @param sensorReadingStream <br>
     */
    private static void studyRichFunctions(DataStream<SensorReading> sensorReadingStream) {
        // 普通的匿名mapFunction
        SingleOutputStreamOperator<String> simpleMapFunctionStream = sensorReadingStream.map(SensorReading::getId);
        // 自定义实现 RichMapFunction
        DataStream<Tuple2<String, Integer>> mapRichFunctionStream = sensorReadingStream.map(new MyRichMapFunction());
        mapRichFunctionStream.print("mapRichFunctionStream");
    }

    /**
     * 自定义实现 RichMapFunction
     */
    public static class MyRichMapFunction extends RichMapFunction<SensorReading, Tuple2<String, Integer>> {
        @Override
        public Tuple2<String, Integer> map(SensorReading sensorReading) {
            // 获取运行时环境
            RuntimeContext context = getRuntimeContext();
            // 获取此并行子任务的编号
            int indexOfThisSubtask = context.getIndexOfThisSubtask();
            return new Tuple2<>(sensorReading.getId(), indexOfThisSubtask);
        }

        /**
         * 初始化方法，一般定义状态，或者建立数据库连接
         *
         * @param parameters 配置信息
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            System.out.println("initialization method for the function");
        }

        /**
         * 一般是关闭连接，清空状态的收尾工作
         */
        @Override
        public void close() throws Exception {
            super.close();
            System.out.println("clean up work.");
        }
    }
}
